library(tidyverse)
## Warning: package 'tidyverse' was built under R version 3.5.3
## -- Attaching packages --------------------------------------------------------------------------------- tidyverse 1.2.1 --
## v ggplot2 3.1.0 v purrr 0.2.5
## v tibble 1.4.2 v dplyr 0.7.8
## v tidyr 0.8.2 v stringr 1.3.1
## v readr 1.3.1 v forcats 0.3.0
## Warning: package 'readr' was built under R version 3.5.2
## Warning: package 'forcats' was built under R version 3.5.2
## -- Conflicts ------------------------------------------------------------------------------------ tidyverse_conflicts() --
## x dplyr::filter() masks stats::filter()
## x dplyr::lag() masks stats::lag()
library(DT)
read.clean.files = function(filename){
file = read.csv(filename, header = FALSE)
column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken")
colnames(file) = column.names
return(file)
}
files = list.files(path = "../Results/", pattern = ".csv$", recursive = TRUE, full.names = TRUE) # List all .csv files
#files
databricks.files = files[grepl("Databricks",files)]
local.vm..files = files[grepl("Local_VM",files)]
rows.databricks = lapply(databricks.files, read.csv, header = FALSE) # Read the files into list
merged.databricks = do.call(rbind, rows.databricks) # combine the data.frame
merged.databricks$Setup = 'Databricks'
rows.local.vm = lapply(local.vm..files, read.csv, header = FALSE) # Read the files into list
merged.local.vm = do.call(rbind, rows.local.vm) # combine the data.frame
merged.local.vm$Setup = 'Local VM'
merged_data = rbind(merged.databricks,merged.local.vm)
merged_data$Setup = as.factor(merged_data$Setup)
column.names = c("Language","Randomize","Dataset","MachineID","RunID","Type","Operation","TimeTaken","Setup")
colnames(merged_data) = column.names
merged_data$Type = as.factor(gsub(pattern = "Operations", replacement = "Operation", x = merged_data$Type))
merged_data = merged_data %>% filter(RunID != 1)
# Convert columns to factors
merged_data$MachineID = as.factor(merged_data$MachineID)
merged_data$Randomize = as.factor(merged_data$Randomize)
merged_data$RunID = as.factor(merged_data$RunID)
merged_data$Dataset = sub("dataset_", "", merged_data$Dataset)
merged_data$Dataset = sub("MB$", "", merged_data$Dataset)
merged_data$Dataset = as.factor(merged_data$Dataset)
str(merged_data)
## 'data.frame': 4134 obs. of 9 variables:
## $ Language : Factor w/ 2 levels "Scala","PySpark": 1 1 1 1 1 1 1 1 1 1 ...
## $ Randomize: Factor w/ 1 level "1": 1 1 1 1 1 1 1 1 1 1 ...
## $ Dataset : Factor w/ 5 levels "10","100","200",..: 5 5 5 5 5 5 5 5 5 5 ...
## $ MachineID: Factor w/ 2 levels "1","2": 1 1 1 1 1 1 1 1 1 1 ...
## $ RunID : Factor w/ 5 levels "2","3","4","5",..: 1 1 1 1 1 1 1 1 1 1 ...
## $ Type : Factor w/ 4 levels "Aggregate Operation",..: 1 1 1 2 2 1 4 4 3 4 ...
## $ Operation: Factor w/ 37 levels " Filter"," Filter Reg Ex 1",..: 5 4 14 20 21 6 2 3 11 27 ...
## $ TimeTaken: num 132.6 36.4 32 176.2 179.6 ...
## $ Setup : Factor w/ 2 levels "Databricks","Local VM": 1 1 1 1 1 1 1 1 1 1 ...
head(merged_data)
## Language Randomize Dataset MachineID RunID Type
## 1 Scala 1 500 1 2 Aggregate Operation
## 2 Scala 1 500 1 2 Aggregate Operation
## 3 Scala 1 500 1 2 Aggregate Operation
## 4 Scala 1 500 1 2 Column Operation
## 5 Scala 1 500 1 2 Column Operation
## 6 Scala 1 500 1 2 Aggregate Operation
## Operation TimeTaken Setup
## 1 GroupBy 10 columns 132.582 Databricks
## 2 GroupBy 1 column 36.444 Databricks
## 3 Ranking by Group 32.050 Databricks
## 4 Sorting Desc 1 column 176.199 Databricks
## 5 Sorting Desc 10 column 179.554 Databricks
## 6 GroupBy 5 columns 52.993 Databricks
summary(merged_data)
## Language Randomize Dataset MachineID RunID
## Scala :2647 1:4134 10 :1005 1:2555 2:906
## PySpark:1487 100: 960 2:1579 3:889
## 200: 712 4:852
## 300: 797 5:781
## 500: 660 6:706
##
##
## Type Operation
## Aggregate Operation: 558 Merge 2 columns into 1 : 142
## Column Operation :2027 Merge 5 columns into 1 : 142
## Mixed Operation : 423 Pivot 1 Rows and 1 Column: 142
## Row Operation :1126 Shift (Lag) : 142
## Split 1 Column into 10 : 142
## Filter : 141
## (Other) :3283
## TimeTaken Setup
## Min. : 0.237 Databricks:3092
## 1st Qu.: 3.354 Local VM :1042
## Median : 10.271
## Mean : 24.995
## 3rd Qu.: 26.175
## Max. :275.332
##
size_10MB = 11.4789848327637 # file.size("../../Data/Databricks/machine2/dataset_10MB.csv")/(1024*1024)
size_100MB = 115.640992164612 # file.size("../../Data/Databricks/machine2/dataset_100MB.csv")/(1024*1024)
size_200MB = 229.8573
size_300MB = 343.2709
size_500MB = 576.678165435791 # file.size("../../Data/Databricks/machine2/dataset_500MB.csv")/(1024*1024)
print(paste("Actual Size of 10MB file (in MB)",size_10MB))
## [1] "Actual Size of 10MB file (in MB) 11.4789848327637"
print(paste("Actual Size of 100MB file (in MB)",size_100MB))
## [1] "Actual Size of 100MB file (in MB) 115.640992164612"
print(paste("Actual Size of 200MB file (in MB)",size_200MB))
## [1] "Actual Size of 200MB file (in MB) 229.8573"
print(paste("Actual Size of 300MB file (in MB)",size_300MB))
## [1] "Actual Size of 300MB file (in MB) 343.2709"
print(paste("Actual Size of 500MB file (in MB)",size_500MB))
## [1] "Actual Size of 500MB file (in MB) 576.678165435791"
size_info = data.frame(Dataset = c("10","100","200","300","500")
,Size = c(size_10MB,size_100MB,size_200MB,size_300MB,size_500MB))
str(size_info)
## 'data.frame': 5 obs. of 2 variables:
## $ Dataset: Factor w/ 5 levels "10","100","200",..: 1 2 3 4 5
## $ Size : num 11.5 115.6 229.9 343.3 576.7
merged_data = merged_data %>%
merge(size_info) %>%
mutate(Throughput = Size/TimeTaken)
Common Functions
summarize_results = function(grouped_data){
rv = grouped_data %>%
summarise(n = n()
,Mean_Time = round(mean(TimeTaken),2)
,Std_Dev_Time= round(sd(TimeTaken),2)
,Coeff_Var_Time = round(Mean_Time/Std_Dev_Time,2)
,Mean_Throughput = round(mean(Throughput),2)
,Std_Dev_Throughput= round(sd(Throughput),2)
,Coeff_Var_Throughput = round(Mean_Throughput/Std_Dev_Throughput,2)
)
return(rv)
}
plot_hist = function(grouped_data, by_var){
indices = grouped_data %>%
dplyr::group_indices() %>%
as.factor()
grouped_data$Index = as.factor(indices)
print(ggplot(grouped_data, aes_string(x = "Index", y = "TimeTaken", fill=by_var)) +
geom_boxplot() +
facet_wrap(~Index, scales = 'free',ncol=4))
return(grouped_data)
}
Databricks vs. Local VM
Table
group = merged_data %>%
group_by(Type, Operation, Language, MachineID, Dataset, Setup)
result = summarize_results(group)
DT::datatable(result)
Plots
group = merged_data %>%
group_by(Type, Operation, Language, MachineID, Dataset)
group2 = plot_hist(grouped_data = group, by_var = "Setup")

#Evaluate outliers
DT::datatable(group2$data[group2$data$Index == 39,])
## Warning: Unknown or uninitialised column: 'data'.
## Warning: Unknown or uninitialised column: 'data'.
Comparison between dataset sizes
group = merged_data %>%
group_by(Type, Operation, Language, MachineID, Setup, Dataset)
result = summarize_results(group)
DT::datatable(result)
Observations
- Throughout (MB/Time) does not remian constant.
- For column operations, it increased from 10MB to 100MB, but decreases from 100MB to 500MB implying that there is a sweet spot.
Plots
group = merged_data %>%
group_by(Type, Operation, Language, MachineID, Setup)
group2 = plot_hist(grouped_data = group, by_var = "Dataset")

#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])
Comparison between Scala and PySpark
group = merged_data %>%
group_by(Type, Operation, Dataset, MachineID, Setup, Language)
result = summarize_results(group)
DT::datatable(result)
Plots
group = merged_data %>%
group_by(Type, Operation, Dataset, MachineID, Setup)
group2 = plot_hist(grouped_data = group, by_var = "Language")

Observations
- In general Scala seems to be faster than PySpark which is good and consistent with theory
#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])
Comparison between Machine 1 and Machine 2
group = merged_data %>%
group_by(Type, Operation, Language, Dataset, Setup, MachineID)
result = summarize_results(group)
DT::datatable(result)
Plots
group = merged_data %>%
group_by(Type, Operation, Language, Dataset, Setup)
group2 = plot_hist(grouped_data = group, by_var = "MachineID")

#Evaluate outliers
#DT::datatable(group2$data[group2$data$Index == 39,])